package test

import (
	"fmt"
	"github.com/IBM/sarama"
	"src/config"
	"src/connection"
	"strconv"
	"time"
)

var kafkaClient sarama.Client
var now = time.Now()

//全局注释 kafka 虽然性能比 rabbitmq要快 但是他丢失数据库的可能性更大，而且还会存在重复接受消息的情况

var Topic = "my_topic"
var partition = int32(0)

// var b int64 = 0
//
// var wg sync.WaitGroup
var num2 = 0

// 回掉函数
func cllbake(s *sarama.PartitionConsumer, v *sarama.ConsumerMessage) *sarama.ConsumerMessage {

	if s != nil {

	}
	//fmt.Println(string(v.Key[4:]), "__________", string(v.Value))
	str := string(v.Key)
	i, _ := strconv.Atoi(str[4:])
	num2++
	if num2%1000 == 0 {
		//fmt.Println("a=", "run_____________________", i) // a=10
		fmt.Println("接收订阅【耗时秒", (time.Now()).Unix()-now.Unix())
		fmt.Println("接收订阅【耗时毫秒", ((time.Now()).UnixNano()/1e6)-(now.UnixNano()/1e6))
		fmt.Println("接收订阅【", str, "】—————", i, "—————接收订阅完毕—————————!", num2)
	}
	return v
}
func push(idx int) {
	//var now = time.Now()
	fmt.Println("发送订阅【", idx, "】———————————————————!", time.Now())
	cli, _ := connKafka()
	//fmt.Println("发送订阅【", i, "】——————————开始发送订阅消息—————————!")
	var count = 1000
	for i := 0; i < count; i++ {
		message := &sarama.ProducerMessage{
			Topic: Topic,
			Key:   sarama.StringEncoder("BBB_" + strconv.Itoa(i+idx*count)),
			Value: sarama.StringEncoder(Topic + strconv.Itoa(i+idx*count)),
		}
		//if i%1000 == 0 {
		//	//fmt.Println("a=", "run_____________________", i) // a=10
		//
		//	//fmt.Println("发送订阅【", i, "】——————————开始发送订阅消息—————————!")
		//	fmt.Println("发送订阅【", i, "】———————————————————!")
		//	fmt.Println("发送订阅耗时秒", (time.Now()).Unix()-now.Unix())
		//	fmt.Println("发送订阅耗时毫秒", ((time.Now()).UnixNano()/1e6)-(now.UnixNano()/1e6))
		//}
		err := KafkaProducer(cli, message)
		if err != nil {
			return
		}
	}

	fmt.Println("发送订阅耗时秒【", idx, "】", (time.Now()).Unix()-now.Unix())
	fmt.Println("发送订阅耗时毫秒【", idx, "】", ((time.Now()).UnixNano()/1e6)-(now.UnixNano()/1e6))
}
func main() {

	fmt.Println("end【0 】——————————开始接收订阅消息—————————!")
	fmt.Println(now.UnixNano() / 1e6)
	fmt.Println(now.Unix())
	config := &config.Config{Name: "./config/config.yaml"}
	e := config.InitConfig()
	if e != nil {
		fmt.Println(e)
	}
	fmt.Println("发送订阅", "start_____________________") // a=10
	//ch2 := make(chan []byte)
	//go connection.KafkaConsumer(Topic, cllbake)
	go connection.KafkaConsumerGroup()

	//msg := "新消息"
	func() {
		for i := 0; i < 50; i++ {
			//go push(i)
		}
	}()
	fmt.Println("发送订阅", "发送订阅完毕_____________________") // a=10
	select {}
}
func connKafka() (sarama.Client, error) {
	config := sarama.NewConfig()
	//sarama.Logger = log{}
	//cfg := sarama.NewConfig()
	//cfg.Version = sarama.V2_2_0_0
	//cfg.Producer.Return.Errors = true
	//cfg.Net.SASL.Enable = false
	//cfg.Producer.Return.Successes = true //这个是关键，否则读取不到消息
	//cfg.Producer.RequiredAcks = sarama.WaitForAll
	//cfg.Producer.Partitioner = sarama.NewManualPartitioner //允许指定分组
	//cfg.Consumer.Return.Errors = true
	//cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
	////cfg.Group.Return.Notifications = true
	//cfg.ClientID = "service-exchange-api"
	config.Producer.RequiredAcks = sarama.WaitForAll        // 赋值为-1：这意味着producer在follower副本确认接收到数据后才算一次发送完成。
	config.Producer.Return.Successes = true                 //这个是关键，否则读取不到消息
	config.Producer.Partitioner = sarama.NewHashPartitioner // 对Key进行Hash，同样的Key每次都落到一个分区，这样消息是有序的
	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
	//config.Producer.Partitioner =sarama.NewRandomPartitioner //写到随机分区中，默认设置8个分区
	cli, err := sarama.NewClient([]string{"192.168.0.104:9092", "localhost:9292", "localhost:9392"}, config)
	//return sarama.NewClient(viper.GetStringSlice("kafka.brokerIpPort"), config)
	if err != nil {
		panic(err)
	}
	kafkaClient = cli
	return cli, err
}

// 异步推送
func KafkaAsyncProducer(client sarama.Client, message *sarama.ProducerMessage) error {

	//此为异步
	c, err := sarama.NewAsyncProducerFromClient(client)
	////sarama.NewSyncProducerFromClient() 此为同步
	if err != nil {
		return err
	}
	defer c.Close()

	//Topic 为主题，Partition为区域 Partition如果不给默认为0 记得设置cfg.Producer.Partitioner = sarama.NewManualPartitioner 这里为允许设置指定的分区
	//分区是从0开始，记得在启动配置文件时修改Partition的分区
	//不同的主题包括不同的分区都是有着不同的offset
	//c.Input() <- &sarama.ProducerMessage{Topic: Topic, Key: sarama.StringEncoder(fmt.Sprintf("/topic/market/order-trade")), Value: sarama.StringEncoder("消息发送成功拉ssssssss！！！！" + strconv.Itoa(i))}
	c.Input() <- message
	select {
	//case msg := <-producer.Successes():
	//    log.Printf("Produced message successes: [%s]\n",msg.Value)
	case err := <-c.Errors():
		fmt.Println("Produced message failure: ", err)
	default:
		//fmt.Println("Produced message success",err)
	}
	return nil
}

// 暂时无用到
func KafkaProducer(client sarama.Client, message *sarama.ProducerMessage) error {

	c, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		return err
	}

	defer c.Close()
	//c.SendMessage(&sarama.ProducerMessage{
	//	Topic: Topic,
	//	Key:   sarama.StringEncoder("BBB"),
	//	Value: sarama.StringEncoder("xxxxd"),
	//})

	c.SendMessage(message)
	if err != nil {
		fmt.Printf("err:", err)
		return err
	}
	//fmt.Println(p, o)

	return nil
}
